1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import static org.mockito.Matchers.any;
19 import static org.mockito.Mockito.mock;
20 import static org.mockito.Mockito.times;
21 import static org.mockito.Mockito.verify;
22
23 import java.util.concurrent.CountDownLatch;
24
25 import org.junit.Test;
26 import org.mockito.Mockito;
27
28 import rx.Observable;
29 import rx.Observer;
30 import rx.functions.Func1;
31 import rx.internal.util.RxRingBuffer;
32 import rx.observers.TestSubscriber;
33
34 public class OperatorFilterTest {
35
36 @Test
37 public void testFilter() {
38 Observable<String> w = Observable.just("one", "two", "three");
39 Observable<String> observable = w.filter(new Func1<String, Boolean>() {
40
41 @Override
42 public Boolean call(String t1) {
43 return t1.equals("two");
44 }
45 });
46
47 @SuppressWarnings("unchecked")
48 Observer<String> observer = mock(Observer.class);
49 observable.subscribe(observer);
50 verify(observer, Mockito.never()).onNext("one");
51 verify(observer, times(1)).onNext("two");
52 verify(observer, Mockito.never()).onNext("three");
53 verify(observer, Mockito.never()).onError(any(Throwable.class));
54 verify(observer, times(1)).onCompleted();
55 }
56
57
58
59
60 @Test(timeout = 500)
61 public void testWithBackpressure() throws InterruptedException {
62 Observable<String> w = Observable.just("one", "two", "three");
63 Observable<String> o = w.filter(new Func1<String, Boolean>() {
64
65 @Override
66 public Boolean call(String t1) {
67 return t1.equals("three");
68 }
69 });
70
71 final CountDownLatch latch = new CountDownLatch(1);
72 TestSubscriber<String> ts = new TestSubscriber<String>() {
73
74 @Override
75 public void onCompleted() {
76 System.out.println("onCompleted");
77 latch.countDown();
78 }
79
80 @Override
81 public void onError(Throwable e) {
82 e.printStackTrace();
83 latch.countDown();
84 }
85
86 @Override
87 public void onNext(String t) {
88 System.out.println("Received: " + t);
89
90 request(1);
91 }
92
93 };
94
95 ts.requestMore(2);
96
97 o.subscribe(ts);
98
99
100 latch.await();
101 }
102
103
104
105
106 @Test(timeout = 500000)
107 public void testWithBackpressure2() throws InterruptedException {
108 Observable<Integer> w = Observable.range(1, RxRingBuffer.SIZE * 2);
109 Observable<Integer> o = w.filter(new Func1<Integer, Boolean>() {
110
111 @Override
112 public Boolean call(Integer t1) {
113 return t1 > 100;
114 }
115 });
116
117 final CountDownLatch latch = new CountDownLatch(1);
118 final TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
119
120 @Override
121 public void onCompleted() {
122 System.out.println("onCompleted");
123 latch.countDown();
124 }
125
126 @Override
127 public void onError(Throwable e) {
128 e.printStackTrace();
129 latch.countDown();
130 }
131
132 @Override
133 public void onNext(Integer t) {
134 System.out.println("Received: " + t);
135
136 request(1);
137 }
138 };
139
140 ts.requestMore(1);
141
142 o.subscribe(ts);
143
144
145 latch.await();
146 }
147 }